/****************************************************************
 * Licensed to the Apache Software Foundation (ASF) under one   *
 * or more contributor license agreements.  See the NOTICE file *
 * distributed with this work for additional information        *
 * regarding copyright ownership.  The ASF licenses this file   *
 * to you under the Apache License, Version 2.0 (the            *
 * "License"); you may not use this file except in compliance   *
 * with the License.  You may obtain a copy of the License at   *
 *                                                              *
 *   http://www.apache.org/licenses/LICENSE-2.0                 *
 *                                                              *
 * Unless required by applicable law or agreed to in writing,   *
 * software distributed under the License is distributed on an  *
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
 * KIND, either express or implied.  See the License for the    *
 * specific language governing permissions and limitations      *
 * under the License.                                           *
 ****************************************************************/

package org.apache.james.mailbox.cassandra;

import static org.apache.james.util.FunctionalUtils.negate;

import java.util.Optional;
import java.util.function.Predicate;

import javax.inject.Inject;

import org.apache.james.mailbox.acl.ACLDiff;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.cassandra.mail.CassandraACLMapper;
import org.apache.james.mailbox.cassandra.mail.CassandraApplicableFlagDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentDAOV2;
import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentMessageIdDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentOwnerDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraDeletedMessageDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraFirstUnseenDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxCounterDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxRecentsDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraUserMailboxRightsDAO;
import org.apache.james.mailbox.cassandra.mail.MessageAttachmentRepresentation;
import org.apache.james.mailbox.cassandra.mail.MessageRepresentation;
import org.apache.james.mailbox.events.Event;
import org.apache.james.mailbox.events.Group;
import org.apache.james.mailbox.events.MailboxListener;
import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
import org.apache.james.mailbox.model.MailboxACL;
import org.apache.james.mailbox.model.MessageMetaData;
import org.apache.james.mailbox.model.MessageRange;
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.util.streams.Limit;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * This listener cleans Cassandra metadata up. It retrieves dandling unreferenced metadata after the delete operation
 * had been conducted out. Then it deletes the lower levels first so that upon failures undeleted metadata can still be
 * reached.
 *
 * This cleanup is not needed for strict correctness from a MailboxManager point of view thus it could be carried out
 * asynchronously, via mailbox listeners so that it can be retried.
 *
 * Mailbox listener failures lead to eventBus retrying their execution, it ensures the result of the deletion to be
 * idempotent.
 */
public class DeleteMessageListener implements MailboxListener.GroupMailboxListener {
    private static final Optional<CassandraId> ALL_MAILBOXES = Optional.empty();

    public static class DeleteMessageListenerGroup extends Group {

    }

    private final CassandraMessageIdToImapUidDAO imapUidDAO;
    private final CassandraMessageIdDAO messageIdDAO;
    private final CassandraMessageDAO messageDAO;
    private final CassandraAttachmentDAOV2 attachmentDAO;
    private final CassandraAttachmentOwnerDAO ownerDAO;
    private final CassandraAttachmentMessageIdDAO attachmentMessageIdDAO;
    private final CassandraACLMapper aclMapper;
    private final CassandraUserMailboxRightsDAO rightsDAO;
    private final CassandraApplicableFlagDAO applicableFlagDAO;
    private final CassandraFirstUnseenDAO firstUnseenDAO;
    private final CassandraDeletedMessageDAO deletedMessageDAO;
    private final CassandraMailboxCounterDAO counterDAO;
    private final CassandraMailboxRecentsDAO recentsDAO;

    @Inject
    public DeleteMessageListener(CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO, CassandraMessageDAO messageDAO,
                                 CassandraAttachmentDAOV2 attachmentDAO, CassandraAttachmentOwnerDAO ownerDAO,
                                 CassandraAttachmentMessageIdDAO attachmentMessageIdDAO, CassandraACLMapper aclMapper,
                                 CassandraUserMailboxRightsDAO rightsDAO, CassandraApplicableFlagDAO applicableFlagDAO,
                                 CassandraFirstUnseenDAO firstUnseenDAO, CassandraDeletedMessageDAO deletedMessageDAO, CassandraMailboxCounterDAO counterDAO, CassandraMailboxRecentsDAO recentsDAO) {
        this.imapUidDAO = imapUidDAO;
        this.messageIdDAO = messageIdDAO;
        this.messageDAO = messageDAO;
        this.attachmentDAO = attachmentDAO;
        this.ownerDAO = ownerDAO;
        this.attachmentMessageIdDAO = attachmentMessageIdDAO;
        this.aclMapper = aclMapper;
        this.rightsDAO = rightsDAO;
        this.applicableFlagDAO = applicableFlagDAO;
        this.firstUnseenDAO = firstUnseenDAO;
        this.deletedMessageDAO = deletedMessageDAO;
        this.counterDAO = counterDAO;
        this.recentsDAO = recentsDAO;
    }

    @Override
    public Group getDefaultGroup() {
        return new DeleteMessageListenerGroup();
    }

    @Override
    public boolean isHandling(Event event) {
        return event instanceof Expunged || event instanceof MailboxDeletion;
    }

    @Override
    public void event(Event event) {
        if (event instanceof Expunged) {
            Expunged expunged = (Expunged) event;

            handleMessageDeletion(expunged)
                .block();
        }
        if (event instanceof MailboxDeletion) {
            MailboxDeletion mailboxDeletion = (MailboxDeletion) event;

            CassandraId mailboxId = (CassandraId) mailboxDeletion.getMailboxId();

            handleMailboxDeletion(mailboxId)
                .block();
        }
    }

    private Mono<Void> handleMailboxDeletion(CassandraId mailboxId) {
        return messageIdDAO.retrieveMessages(mailboxId, MessageRange.all(), Limit.unlimited())
            .map(ComposedMessageIdWithMetaData::getComposedMessageId)
            .concatMap(metadata -> handleMessageDeletionAsPartOfMailboxDeletion((CassandraMessageId) metadata.getMessageId(), mailboxId)
                .then(imapUidDAO.delete((CassandraMessageId) metadata.getMessageId(), mailboxId))
                .then(messageIdDAO.delete(mailboxId, metadata.getUid())))
            .then(deleteAcl(mailboxId))
            .then(applicableFlagDAO.delete(mailboxId))
            .then(firstUnseenDAO.removeAll(mailboxId))
            .then(deletedMessageDAO.removeAll(mailboxId))
            .then(counterDAO.delete(mailboxId))
            .then(recentsDAO.delete(mailboxId));
    }

    private Mono<Void> handleMessageDeletion(Expunged expunged) {
        return Flux.fromIterable(expunged.getExpunged()
            .values())
            .map(MessageMetaData::getMessageId)
            .map(CassandraMessageId.class::cast)
            .concatMap(this::handleMessageDeletion)
            .then();
    }

    private Mono<Void> deleteAcl(CassandraId mailboxId) {
        return aclMapper.getACL(mailboxId)
            .flatMap(acl -> rightsDAO.update(mailboxId, ACLDiff.computeDiff(acl, MailboxACL.EMPTY)))
            .then(aclMapper.delete(mailboxId));
    }

    private Mono<Void> handleMessageDeletion(CassandraMessageId messageId) {
        return Mono.just(messageId)
            .filterWhen(this::isReferenced)
            .flatMap(id -> readMessage(id)
                .flatMap(message -> deleteUnreferencedAttachments(message).thenReturn(message))
                .flatMap(this::deleteAttachmentMessageIds)
                .then(messageDAO.delete(messageId)));
    }

    private Mono<Void> handleMessageDeletionAsPartOfMailboxDeletion(CassandraMessageId messageId, CassandraId excludedId) {
        return Mono.just(messageId)
            .filterWhen(id -> isReferenced(id, excludedId))
            .flatMap(id -> readMessage(id)
                .flatMap(message -> deleteUnreferencedAttachments(message).thenReturn(message))
                .flatMap(this::deleteAttachmentMessageIds)
                .then(messageDAO.delete(messageId)));
    }

    private Mono<MessageRepresentation> readMessage(CassandraMessageId id) {
        return messageDAO.retrieveMessage(id, MessageMapper.FetchType.Metadata);
    }

    private Mono<Void> deleteUnreferencedAttachments(MessageRepresentation message) {
        return Flux.fromIterable(message.getAttachments())
            .filterWhen(attachment -> ownerDAO.retrieveOwners(attachment.getAttachmentId()).hasElements().map(negate()))
            .filterWhen(attachment -> hasOtherMessagesReferences(message, attachment))
            .concatMap(attachment -> attachmentDAO.delete(attachment.getAttachmentId()))
            .then();
    }

    private Mono<Void> deleteAttachmentMessageIds(MessageRepresentation message) {
        return Flux.fromIterable(message.getAttachments())
            .concatMap(attachment -> attachmentMessageIdDAO.delete(attachment.getAttachmentId(), message.getMessageId()))
            .then();
    }

    private Mono<Boolean> hasOtherMessagesReferences(MessageRepresentation message, MessageAttachmentRepresentation attachment) {
        return attachmentMessageIdDAO.getOwnerMessageIds(attachment.getAttachmentId())
            .filter(Predicate.not(Predicate.isEqual(message.getMessageId())))
            .hasElements()
            .map(negate());
    }

    private Mono<Boolean> isReferenced(CassandraMessageId id) {
        return imapUidDAO.retrieve(id, ALL_MAILBOXES)
            .hasElements()
            .map(negate());
    }

    private Mono<Boolean> isReferenced(CassandraMessageId id, CassandraId excludedId) {
        return imapUidDAO.retrieve(id, ALL_MAILBOXES)
            .filter(metadata -> !metadata.getComposedMessageId().getMailboxId().equals(excludedId))
            .hasElements()
            .map(negate());
    }
}
